package producer

import (
	"github.com/Shopify/sarama"
	"time"
)




type OnMessageCallback func(sarama.AsyncProducer)
//async producer
type FinderKafkaProducer struct {
	Producer sarama.AsyncProducer
	brokers []string

}

func NewFinderKafkaProducer(brokers []string) (*FinderKafkaProducer,error) {
	kafka:=&FinderKafkaProducer{brokers:brokers}


	config := sarama.NewConfig()
	//等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForAll
	//随机向partition发送消息
	config.Producer.Partitioner = sarama.NewRandomPartitioner

	//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true

	config.Producer.Timeout = 5 * time.Second

	//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
	//注意，版本设置不对的话，kafka会返回很奇怪的错误，并且无法成功发送消息
	config.Version = sarama.V0_10_0_1


	p, err := sarama.NewAsyncProducer(brokers, config)
	kafka.Producer=p

	if err!=nil{
		return nil ,err
	}else{
		return kafka,nil
	}
}

//生产
func (this *FinderKafkaProducer) OnProduce(topic string,data interface{},onMessageCallback OnMessageCallback)  {


	// 发送的消息,主题。
	// 注意：这里的msg必须得是新构建的变量，不然你会发现发送过去的消息内容都是一样的，因为批次发送消息的关系。


	if onMessageCallback==nil{
		//必须有这个匿名函数内容
		/*go func(p sarama.AsyncProducer) {
			errors := p.Errors()
			success := p.Successes()
			for {
				select {
				case err := <-errors:
					if err != nil {
						log.Error(err)
					}
				case msg:=<-success:

					log.Info("发送到kafka成功 %+v",msg)
				}
			}
		}(this.Producer)*/
	}else{
		go onMessageCallback(this.Producer)
	}


	msg:=&sarama.ProducerMessage{}
	msg.Topic=topic

	switch data.(type) {
	case string:
		msg.Value=sarama.ByteEncoder(data.(string))
	case []byte:
		msg.Value=sarama.ByteEncoder(data.([]byte))
	default:

	}

	//使用通道发送
	this.Producer.Input() <- msg
}

